Skip to content

feat: replace SCAN with sorted set index for time-based schedule lookups#121

Open
tmkarthi wants to merge 3 commits intotaskiq-python:mainfrom
8loop-ai:main
Open

feat: replace SCAN with sorted set index for time-based schedule lookups#121
tmkarthi wants to merge 3 commits intotaskiq-python:mainfrom
8loop-ai:main

Conversation

@tmkarthi
Copy link

Summary

  • Replace redis.scan_iter() in _get_previous_time_schedules() with ZRANGEBYSCORE on a sorted set time index ({prefix}:time_index), reducing lookup complexity from O(N) over the entire Redis keyspace to O(log(K) + M) where K is the
    number of time entries and M is matching results
  • Fix missing self._is_first_run guard in get_schedules() — without it, _get_previous_time_schedules() runs on every scheduler tick, not just the first
  • Add lazy cleanup for stale time index entries with a 1-hour safety window to avoid race conditions between concurrent add_schedule and delete_schedule at the same minute
  • Accept current_time parameter in _get_previous_time_schedules() to prevent window overlap with the caller's already-captured timestamp

Problem

The scheduler calls get_schedules() every N seconds (default 60). On the first run, _get_previous_time_schedules() uses redis.scan_iter("{prefix}:time:*") to find past schedules. SCAN iterates over every key in the Redis database
and pattern-matches — if the Redis instance has millions of keys (result backends, broker streams, caches), this becomes extremely slow and can overwhelm Redis.

Additionally, get_schedules() was missing the _is_first_run check when skip_past_schedules=False, causing the expensive SCAN to execute repeatedly instead of just once.

Solution

Maintain a Redis sorted set ({prefix}:time_index) as a secondary index. Scores are UTC timestamps (truncated to the minute). add_schedule() does a ZADD alongside the existing RPUSH, and lookups use ZRANGEBYSCORE instead of SCAN.

A populate_time_index constructor parameter (default False) enables a one-time SCAN on startup to backfill the index from existing {prefix}:time:* keys, for migrating from older versions.

Test plan

  • Existing tests pass
  • Schedule a time-based task, verify it is picked up by the scheduler
  • Verify SCAN is not called during normal scheduler operation (monitor with redis-cli MONITOR)
  • Test populate_time_index=True migrates existing time keys into the sorted set
  • Test concurrent add_schedule / delete_schedule at the same minute does not lose index entries

- Introduced `populate_time_index` parameter to backfill the time index from existing keys.
- Updated `startup` method to populate the time index if `populate_time_index` is set to True.
- Modified schedule addition and deletion to manage the time index sorted set.
- Added tests to verify time index population and cleanup behavior.
- Added `_maybe_cleanup_time_index` method to manage time index cleanup at most once per minute.
- Introduced `_cleanup_time_index` method to remove stale entries older than one hour with empty time key lists.
- Updated `delete_schedule` to call `_maybe_cleanup_time_index` for efficient cleanup.
- Enhanced tests to verify the behavior of the new cleanup methods, ensuring proper handling of stale and recent entries.
…ameter

- Updated the _get_previous_time_schedules method to take current_time as an argument, allowing for more precise cutoff calculations.
- Adjusted the logic to use the provided current_time for determining previous schedules, ensuring no overlap with the current window.
- Modified the call to _get_previous_time_schedules in the first run logic to pass the current_time parameter.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant